package cn.doitedu.etl;

import ch.hsr.geohash.GeoHash;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/9
 * @Desc: 商城用户行为事件日志数据，公共维度退维处理
 * 测试数据（ 通过 kafka 的 producer，往 kafka中灌入即可
{"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","lat":38.089969323508726,"lng":114.35731900345093,"username":"tiger","eventId":"ad_click","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
{"release_channel":"360应用市场","device_type":"mi6","session_id":"s01","lat":38.089969323508726,"lng":114.35731900345093,"username":"aewen","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
{"release_channel":"360应用市场","device_type":"mi7","session_id":"s02","lat":37.82511891440681,"lng":113.14161086395431,"username":"lion","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/mall/2354.html?a=3","itemId":"item002"}}
{"release_channel":"华为应用市场","device_type":"mi7","session_id":"s03","lat":41.11620438740027,"lng":111.17957193814095,"username":"lisi","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/mall/promotion/2354.html?a=3","itemId":"item002"}}
{"release_channel":"华为应用市场","device_type":"mi8","session_id":"s03","lat":41.11620438740027,"lng":111.17957193814095,"username":"wangwu","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/mall/search/2354.html?a=3","itemId":"item002"}}
{"release_channel":"华为应用市场","device_type":"mi8","session_id":"s03","lat":38.089969323508726,"lng":114.35731900345093,"username":"windy","eventId":"add_cart","eventTime":1670583693000,"properties":{"url":"/content/article/2354.html?a=3","itemId":"item002"}}
 **/
public class E01_EtlJob_MallUserEventCommonDim {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建 kafka连接器数据源表： 用户行为事件
        tEnv.executeSql(
                " CREATE TABLE mall_events_kafkasource (               "
                        + "     username     string,                            "
                        + "     session_id   string,                            "
                        + "     eventId      string,                            "
                        + "     eventTime    bigint,                            "
                        + "     lat          double,                            "
                        + "     lng          double,                            "
                        + "     release_channel   string,                       "
                        + "     device_type       string,                       "
                        + "     properties   map<string,string>,                "
                        + "     proc_time   AS PROCTIME()                       " // process time语义的字段声明;lookup join 时主表必须有processtime字段
                        + " ) WITH (                                            "
                        + "  'connector' = 'kafka',                             "
                        + "  'topic' = 'mall-events',                           "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                        + "  'properties.group.id' = 'com-dim',                 "
                        + "  'scan.startup.mode' = 'earliest-offset',           "
                        + "  'value.format'='json',                             "
                        + "  'value.json.fail-on-missing-field'='false',        "
                        + "  'value.fields-include' = 'EXCEPT_KEY'              "
                        + " )                                                   ");


        // 创建 hbase连接器 表： 用户注册信息维表
        tEnv.executeSql(
                "CREATE TABLE ums_member_hbasesource ( " +
                " username STRING,                              " +
                " f ROW<id INT,phone STRING, status INT, create_time TIMESTAMP(3), gender INT, birthday DATE, province STRING, city STRING, job STRING, source_type INT>, " +
                " PRIMARY KEY (username) NOT ENFORCED           " +
                ") WITH (                                       " +
                " 'connector' = 'hbase-2.2',                    " +
                " 'table-name' = 'dim_user_info',               " +
                " 'zookeeper.quorum' = 'doitedu:2181'           " +
                ")");


        // 创建 hbase连接器 表： geohash码地域维表
        tEnv.executeSql(
                "CREATE TABLE dim_geo_area_hbasesource ( " +
                        " geohash STRING,                                      " +
                        " f ROW<p STRING, c STRING, r STRING>, " +
                        " PRIMARY KEY (geohash) NOT ENFORCED            " +
                        ") WITH (                                       " +
                        " 'connector' = 'hbase-2.2',                    " +
                        " 'table-name' = 'dim_geo_area',                " +
                        " 'zookeeper.quorum' = 'doitedu:2181'           " +
                        ")");


        // 创建hbase连接器表： 页面信息维表
        tEnv.executeSql(
                "CREATE TABLE dim_page_info_hbasesource ( " +
                        " url STRING,                                 " +
                        " f ROW<pt STRING, sv STRING>,    " +
                        " PRIMARY KEY (url) NOT ENFORCED              " +
                        ") WITH (                                     " +
                        " 'connector' = 'hbase-2.2',                  " +
                        " 'table-name' = 'dim_page_info',             " +
                        " 'zookeeper.quorum' = 'doitedu:2181'         " +
                        ")");


        // 创建kafka连接器表： 维度打宽后的目标sink表
        tEnv.executeSql(
                " CREATE TABLE res_kafkasink(                          "
                        + "     user_id           INT,                          "
                        + "     username          string,                       "
                        + "     session_id        string,                       "
                        + "     event_id          string,                       "
                        + "     event_time        bigint,                       "
                        + "     lat               double,                       "
                        + "     lng               double,                       "
                        + "     release_channel   string,                       "
                        + "     device_type       string,                       "
                        + "     properties        map<string,string>,           "
                        + "     register_phone    STRING,                       "
                        + "     user_status       INT,                          "
                        + "     register_time     TIMESTAMP(3),                 "
                        + "     register_gender   INT,                          "
                        + "     register_birthday DATE,                         "
                        + "     register_province STRING,                       "
                        + "     register_city     STRING,                       "
                        + "     register_job      STRING,                       "
                        + "     register_source_type INT,                       "
                        + "     gps_province      STRING,                       "
                        + "     gps_city          STRING,                       "
                        + "     gps_region        STRING,                       "
                        + "     page_type         STRING,                       "
                        + "     page_service      STRING                        "
                        + " ) WITH (                                            "
                        + "  'connector' = 'kafka',                             "
                        + "  'topic' = 'mall-evts-comdim-w',                    "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',   "
                        + "  'properties.group.id' = 'testGroup',               "
                        + "  'scan.startup.mode' = 'earliest-offset',           "
                        + "  'value.format'='json',                             "
                        + "  'value.json.fail-on-missing-field'='false',        "
                        + "  'value.fields-include' = 'EXCEPT_KEY')             ");

        // 创建doris连接器表： 维度打宽后的目标sink表
        tEnv.executeSql(
                " CREATE TABLE res_dorissink(             "
                        +"     gps_province         VARCHAR(16),   "
                        +"     gps_city             VARCHAR(16),   "
                        +"     gps_region           VARCHAR(16),   "
                        +"     dt                   DATE,          "
                        +"     user_id              INT,           "
                        +"     username             VARCHAR(20),   "
                        +"     session_id           VARCHAR(20),   "
                        +"     event_id             VARCHAR(20),   "
                        +"     event_time           bigint,        "
                        +"     lat                  DOUBLE,        "
                        +"     lng                  DOUBLE,        "
                        +"     release_channel      VARCHAR(20),   "
                        +"     device_type          VARCHAR(20),   "
                        +"     properties           VARCHAR(40),   "  // doris不支持Map类型的字段，所以需要把Properties转成json字符串来插入
                        +"     register_phone       VARCHAR(20),   "
                        +"     user_status          INT,           "
                        +"     register_time        TIMESTAMP(3),  "
                        +"     register_gender      INT,           "
                        +"     register_birthday    DATE,          "
                        +"     register_province    VARCHAR(20),   "
                        +"     register_city        VARCHAR(20),   "
                        +"     register_job         VARCHAR(20),   "
                        +"     register_source_type INT        ,   "
                        +"     page_type            VARCHAR(20),   "
                        +"     page_service         VARCHAR(20)    "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dwd.mall_evts_comdim_w',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_label"+System.currentTimeMillis()+"'"
                        + " )                                         ");


        // 测试正则抽取 url特征子串  /content/article/12314.html?a=3&b=4   ==> /content/article/
        //tEnv.executeSql("select regexp_extract(properties['url'],'(^.*/).*?') from mall_events_kafkasource").print();

        // 注册自定义函数 用于生成geohash码
        tEnv.createTemporaryFunction("geo",GeoHashFunction.class);

        // 三表关联，并将
        tEnv.executeSql(
                " CREATE TEMPORARY VIEW wide_view AS  SELECT                                  "
                        +"     u.id as user_id,                                                        "
                        +"     e.username,                                                             "
                        +"     e.session_id,                                                           "
                        +"     e.eventId as event_id,                                                  "
                        +"     e.eventTime as event_time,                                              "
                        +"     e.lat,e.lng,                                                            "
                        +"     e.release_channel,                                                      "
                        +"     e.device_type,                                                          "
                        +"     e.properties,                                                           "
                        +"     u.f.phone as register_phone,                                            "
                        +"     u.f.status as user_status,                                              "
                        +"     u.f.create_time as register_time,                                       "
                        +"     u.f.gender as register_gender,                                          "
                        +"     u.f.birthday as register_birthday,                                      "
                        +"     u.f.province as register_province,                                      "
                        +"     u.f.city as register_city,                                              "
                        +"     u.f.job as register_job,                                                "
                        +"     u.f.source_type as register_source_type,                                "
                        +"     g.f.p as gps_province,                                                  "
                        +"     g.f.c as gps_city,                                                      "
                        +"     g.f.r as gps_region,                                                    "
                        +"     p.f.pt as page_type,                                                    "
                        +"     p.f.sv as page_service                                                  "
                        +" FROM mall_events_kafkasource AS e                                           "
                        +" LEFT JOIN ums_member_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS u     "
                        +"     ON e.username = u.username                                              "
                        +" LEFT JOIN dim_geo_area_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS g   "
                        +"     ON REVERSE(geo(e.lat,e.lng)) = g.geohash                                         "
                        +" LEFT JOIN dim_page_info_hbasesource FOR SYSTEM_TIME AS OF e.proc_time AS p  "
                        +"     ON regexp_extract(e.properties['url'],'(^.*/).*?') = p.url              "
        );

        // 打宽后的结果写入kafka
        tEnv.executeSql("INSERT INTO res_kafkasink  SELECT * from  wide_view");

        // 注册map转json函数
        tEnv.createTemporaryFunction("toJson",HashMapToJson.class);
        //测试long类型时间戳，转成date类型
        //tEnv.executeSql("select TO_DATE(DATE_FORMAT(TO_TIMESTAMP_LTZ(event_time, 3),'yyyy-MM-dd')) from wide_view").print();

        // 打宽后的结果写入doris
        tEnv.executeSql("INSERT INTO res_dorissink                                      "
                +" SELECT                                                                         "
                +"     gps_province         ,                                                     "
                +"     gps_city             ,                                                     "
                +"     gps_region           ,                                                     "
                +"     TO_DATE(DATE_FORMAT(TO_TIMESTAMP_LTZ(event_time, 3),'yyyy-MM-dd')) as dt,  "
                +"     user_id              ,                                                     "
                +"     username             ,                                                     "
                +"     session_id           ,                                                     "
                +"     event_id             ,                                                     "
                +"     event_time           ,                                                     "
                +"     lat                  ,                                                     "
                +"     lng                  ,                                                     "
                +"     release_channel      ,                                                     "
                +"     device_type          ,                                                     "
                +"     toJson(properties) as properties     ,                                     "
                +"     register_phone       ,                                                     "
                +"     user_status          ,                                                     "
                +"     register_time        ,                                                     "
                +"     register_gender      ,                                                     "
                +"     register_birthday    ,                                                     "
                +"     register_province    ,                                                     "
                +"     register_city        ,                                                     "
                +"     register_job         ,                                                     "
                +"     register_source_type ,                                                     "
                +"     page_type            ,                                                     "
                +"     page_service                                                               "
                +" FROM   wide_view                                                               "
        );

    }


    // 自定义函数： gps转geohash码
    public static class GeoHashFunction extends ScalarFunction{
        public String eval(Double lat,Double lng){
            try {
                return GeoHash.geoHashStringWithCharacterPrecision(lat, lng, 5);
            }catch(Exception exception){

            }
            return "";
        }
    }


    // 自定义函数： hashmap转json串
    public static class HashMapToJson extends ScalarFunction{
        public String eval(Map<String,String> properties){
            return JSON.toJSONString(properties);
        }
    }
}
